package com.parkingwang.learning.observertype;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

import java.util.concurrent.TimeUnit;

public class SubjectProcessor {

    public static void main(String[] args) {

        Consumer<Long> subscirbe1 = new Consumer<Long>() {
            @Override
            public void accept(Long aLong) {
                System.out.println("s1="+aLong);
            }
        };


        Consumer<Long> subscribe2 = new Consumer<Long>() {
            @Override
            public void accept(Long aLong) {
                System.out.println("s2="+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
                Observable.interval(10,TimeUnit.MILLISECONDS)
                        .take(Integer.MAX_VALUE)
                        .subscribe(emitter::onNext);
            }
        }).observeOn(Schedulers.newThread());

        //subject 既是观察者，也是被观察者
        //不是线程安全的，想要线程安全，toSerialized()
        Subject<Long> subject = PublishSubject.create();


        Subject<Long> longSubject = subject.toSerialized();

        observable.subscribe(longSubject);

        longSubject.subscribe(subscirbe1);


        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        longSubject.subscribe(subscribe2);

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}
